/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.sql.planner.planPrinter;

import com.facebook.presto.operator.DynamicFilterStats;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.util.Mergeable;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.util.MoreMaps.mergeMaps;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.Double.max;
import static java.lang.Math.sqrt;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;

public class PlanNodeStats
        implements Mergeable<PlanNodeStats>
{
    private final PlanNodeId planNodeId;

    private final Duration planNodeScheduledTime;
    private final Duration planNodeCpuTime;
    private final Duration planNodeBlockedWallTime;
    private final Duration planNodeAddInputWallTime;
    private final Duration planNodeGetOutputWallTime;
    private final Duration planNodeFinishWallTime;
    private final long planNodeInputPositions;
    private final DataSize planNodeInputDataSize;
    private final long planNodeRawInputPositions;
    private final DataSize planNodeRawInputDataSize;
    private final long planNodeOutputPositions;
    private final DataSize planNodeOutputDataSize;

    private final DataSize planNodePeakMemorySize;
    protected final Map<String, OperatorInputStats> operatorInputStats;
    private final long planNodeNullJoinBuildKeyCount;
    private final long planNodeJoinBuildKeyCount;
    private final long planNodeNullJoinProbeKeyCount;
    private final long planNodeJoinProbeKeyCount;
    private final Optional<DynamicFilterStats> dynamicFilterStats;

    @JsonCreator
    public PlanNodeStats(
            @JsonProperty("planNodeId") PlanNodeId planNodeId,
            @JsonProperty("planNodeScheduledTime") Duration planNodeScheduledTime,
            @JsonProperty("planNodeCpuTime") Duration planNodeCpuTime,
            @JsonProperty("planNodeBlockedWallTime") Duration planNodeBlockedWallTime,
            @JsonProperty("planNodeAddInputWallTime") Duration planNodeAddInputWallTime,
            @JsonProperty("planNodeGetOutputWallTime") Duration planNodeGetOutputWallTime,
            @JsonProperty("planNodeFinishWallTime") Duration planNodeFinishWallTime,
            @JsonProperty("planNodeInputPositions") long planNodeInputPositions,
            @JsonProperty("planNodeInputDataSize") DataSize planNodeInputDataSize,
            @JsonProperty("planNodeRawInputPositions") long planNodeRawInputPositions,
            @JsonProperty("planNodeRawInputDataSize") DataSize planNodeRawInputDataSize,
            @JsonProperty("planNodeOutputPositions") long planNodeOutputPositions,
            @JsonProperty("planNodeOutputDataSize") DataSize planNodeOutputDataSize,
            @JsonProperty("planNodePeakMemorySize") DataSize planNodePeakMemorySize,
            @JsonProperty("operatorInputStats") Map<String, OperatorInputStats> operatorInputStats,
            @JsonProperty("planNodeNullJoinBuildKeyCount") long planNodeNullJoinBuildKeyCount,
            @JsonProperty("planNodeJoinBuildKeyCount") long planNodeJoinBuildKeyCount,
            @JsonProperty("planNodeNullJoinProbeKeyCount") long planNodeNullJoinProbeKeyCount,
            @JsonProperty("planNodeJoinProbeKeyCount") long planNodeJoinProbeKeyCount,
            @JsonProperty("dynamicFilterStats") Optional<DynamicFilterStats> dynamicFilterStats)
    {
        this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");

        this.planNodeScheduledTime = requireNonNull(planNodeScheduledTime, "planNodeScheduledTime is null");
        this.planNodeCpuTime = requireNonNull(planNodeCpuTime, "planNodeCpuTime is null");
        this.planNodeBlockedWallTime = requireNonNull(planNodeBlockedWallTime, "planNodeBlockedWallTime is null");
        this.planNodeAddInputWallTime = requireNonNull(planNodeAddInputWallTime, "planNodeAddInputWallTime is null");
        this.planNodeGetOutputWallTime = requireNonNull(planNodeGetOutputWallTime, "planNodeGetOutputWallTime is null");
        this.planNodeFinishWallTime = requireNonNull(planNodeFinishWallTime, "planNodeFinishWallTime is null");
        this.planNodeInputPositions = planNodeInputPositions;
        this.planNodeInputDataSize = planNodeInputDataSize;
        this.planNodeRawInputPositions = planNodeRawInputPositions;
        this.planNodeRawInputDataSize = planNodeRawInputDataSize;
        this.planNodeOutputPositions = planNodeOutputPositions;
        this.planNodeOutputDataSize = planNodeOutputDataSize;

        this.operatorInputStats = requireNonNull(operatorInputStats, "operatorInputStats is null");
        this.planNodePeakMemorySize = planNodePeakMemorySize;
        this.planNodeNullJoinBuildKeyCount = planNodeNullJoinBuildKeyCount;
        this.planNodeJoinBuildKeyCount = planNodeJoinBuildKeyCount;
        this.planNodeNullJoinProbeKeyCount = planNodeNullJoinProbeKeyCount;
        this.planNodeJoinProbeKeyCount = planNodeJoinProbeKeyCount;
        this.dynamicFilterStats = dynamicFilterStats;
    }

    private static double computedStdDev(double sumSquared, double sum, long n)
    {
        double average = sum / n;
        double variance = (sumSquared - 2 * sum * average + average * average * n) / n;
        // variance might be negative because of numeric inaccuracy, therefore we need to use max
        return sqrt(max(variance, 0d));
    }

    @JsonProperty
    public PlanNodeId getPlanNodeId()
    {
        return planNodeId;
    }

    @JsonProperty
    public Duration getPlanNodeScheduledTime()
    {
        return planNodeScheduledTime;
    }

    @JsonProperty
    public Duration getPlanNodeCpuTime()
    {
        return planNodeCpuTime;
    }

    @JsonProperty
    public Duration getPlanNodeBlockedWallTime()
    {
        return planNodeBlockedWallTime;
    }

    @JsonProperty
    public Duration getPlanNodeAddInputWallTime()
    {
        return planNodeAddInputWallTime;
    }

    @JsonProperty
    public Duration getPlanNodeGetOutputWallTime()
    {
        return planNodeGetOutputWallTime;
    }

    @JsonProperty
    public Duration getPlanNodeFinishWallTime()
    {
        return planNodeFinishWallTime;
    }

    @JsonProperty
    public Map<String, OperatorInputStats> getOperatorInputStats()
    {
        // no need to copy, just prevent modifications
        return Collections.unmodifiableMap(operatorInputStats);
    }

    public Set<String> getOperatorTypes()
    {
        return operatorInputStats.keySet();
    }

    @JsonProperty
    public long getPlanNodeInputPositions()
    {
        return planNodeInputPositions;
    }

    @JsonProperty
    public DataSize getPlanNodeInputDataSize()
    {
        return planNodeInputDataSize;
    }

    @JsonProperty
    public long getPlanNodeRawInputPositions()
    {
        return planNodeRawInputPositions;
    }

    @JsonProperty
    public DataSize getPlanNodeRawInputDataSize()
    {
        return planNodeRawInputDataSize;
    }

    @JsonProperty
    public long getPlanNodeOutputPositions()
    {
        return planNodeOutputPositions;
    }

    @JsonProperty
    public DataSize getPlanNodeOutputDataSize()
    {
        return planNodeOutputDataSize;
    }

    public Map<String, Double> getOperatorInputPositionsAverages()
    {
        return operatorInputStats.entrySet().stream()
                .collect(toMap(
                        Map.Entry::getKey,
                        entry -> (double) entry.getValue().getInputPositions() / operatorInputStats.get(entry.getKey()).getTotalDrivers()));
    }

    public Map<String, Double> getOperatorInputPositionsStdDevs()
    {
        return operatorInputStats.entrySet().stream()
                .collect(toMap(
                        Map.Entry::getKey,
                        entry -> computedStdDev(
                                entry.getValue().getSumSquaredInputPositions(),
                                entry.getValue().getInputPositions(),
                                entry.getValue().getTotalDrivers())));
    }

    @JsonProperty
    public DataSize getPlanNodePeakMemorySize()
    {
        return planNodePeakMemorySize;
    }

    @JsonProperty
    public long getPlanNodeNullJoinBuildKeyCount()
    {
        return planNodeNullJoinBuildKeyCount;
    }

    @JsonProperty
    public long getPlanNodeJoinBuildKeyCount()
    {
        return planNodeJoinBuildKeyCount;
    }

    @JsonProperty
    public long getPlanNodeNullJoinProbeKeyCount()
    {
        return planNodeNullJoinProbeKeyCount;
    }

    @JsonProperty
    public long getPlanNodeJoinProbeKeyCount()
    {
        return planNodeJoinProbeKeyCount;
    }

    public Optional<DynamicFilterStats> getDynamicFilterStats()
    {
        return dynamicFilterStats;
    }

    public static Optional<DynamicFilterStats> mergeDynamicFilterStats(Optional<DynamicFilterStats> stats1, Optional<DynamicFilterStats> stats2)
    {
        Optional<DynamicFilterStats> optionalDynamicFilterStats = Optional.empty();
        if (stats1.isPresent()) {
            DynamicFilterStats dynamicFilterStats = stats1.get();
            stats2.ifPresent(dynamicFilterStats::mergeWith);
            optionalDynamicFilterStats = Optional.of(dynamicFilterStats);
        }
        else if (stats2.isPresent()) {
            optionalDynamicFilterStats = Optional.of(stats2.get());
        }
        return optionalDynamicFilterStats;
    }

    @Override
    public PlanNodeStats mergeWith(PlanNodeStats other)
    {
        checkArgument(planNodeId.equals(other.getPlanNodeId()), "planNodeIds do not match. %s != %s", planNodeId, other.getPlanNodeId());

        long planNodeInputPositions = this.planNodeInputPositions + other.planNodeInputPositions;
        DataSize planNodeInputDataSize = succinctBytes((long) ((double) this.planNodeInputDataSize.toBytes() + (double) other.planNodeInputDataSize.toBytes()));
        long planNodeRawInputPositions = this.planNodeRawInputPositions + other.planNodeRawInputPositions;
        DataSize planNodeRawInputDataSize = succinctBytes((long) ((double) this.planNodeRawInputDataSize.toBytes() + (double) other.planNodeRawInputDataSize.toBytes()));
        long planNodeOutputPositions = this.planNodeOutputPositions + other.planNodeOutputPositions;
        DataSize planNodeOutputDataSize = succinctBytes((long) ((double) this.planNodeOutputDataSize.toBytes() + (double) other.planNodeOutputDataSize.toBytes()));
        DataSize planNodePeakMemorySize = succinctBytes(Math.max(this.planNodePeakMemorySize.toBytes(), other.planNodePeakMemorySize.toBytes()));

        Map<String, OperatorInputStats> operatorInputStats = mergeMaps(this.operatorInputStats, other.operatorInputStats, OperatorInputStats::merge);
        long planNodeNullJoinBuildKeyCount = this.planNodeNullJoinBuildKeyCount + other.planNodeNullJoinBuildKeyCount;
        long planNodeJoinBuildKeyCount = this.planNodeJoinBuildKeyCount + other.planNodeJoinBuildKeyCount;
        long planNodeNullJoinProbeKeyCount = this.planNodeNullJoinProbeKeyCount + other.planNodeNullJoinProbeKeyCount;
        long planNodeJoinProbeKeyCount = this.planNodeJoinProbeKeyCount + other.planNodeJoinProbeKeyCount;
        Optional<DynamicFilterStats> optionalDynamicFilterStats = mergeDynamicFilterStats(this.dynamicFilterStats, other.dynamicFilterStats);

        return new PlanNodeStats(
                planNodeId,
                new Duration(planNodeScheduledTime.toMillis() + other.getPlanNodeScheduledTime().toMillis(), MILLISECONDS),
                new Duration(planNodeCpuTime.toMillis() + other.getPlanNodeCpuTime().toMillis(), MILLISECONDS),
                new Duration(planNodeBlockedWallTime.toMillis() + other.getPlanNodeBlockedWallTime().toMillis(), MILLISECONDS),
                new Duration(planNodeAddInputWallTime.toMillis() + other.getPlanNodeAddInputWallTime().toMillis(), MILLISECONDS),
                new Duration(planNodeGetOutputWallTime.toMillis() + other.getPlanNodeGetOutputWallTime().toMillis(), MILLISECONDS),
                new Duration(planNodeFinishWallTime.toMillis() + other.getPlanNodeFinishWallTime().toMillis(), MILLISECONDS),
                planNodeInputPositions, planNodeInputDataSize,
                planNodeRawInputPositions, planNodeRawInputDataSize,
                planNodeOutputPositions, planNodeOutputDataSize,
                planNodePeakMemorySize,
                operatorInputStats,

                planNodeNullJoinBuildKeyCount,
                planNodeJoinBuildKeyCount,
                planNodeNullJoinProbeKeyCount,
                planNodeJoinProbeKeyCount,
                optionalDynamicFilterStats);
    }
}
